25 断线续传、数据缓存与优化
断线续传、数据缓存与优化
关联:索引
要解决的问题
- 为什么“自动重连做完了”仍然会丢指令:断线期间用户点了 5 次按钮,恢复后只到 1 次,剩下去哪了?
- 为什么“补发”会变成“重复执行”:同一条控制指令被补发 3 次,机械臂执行了 3 次,谁应该保证幂等?
- 断线续传到底“续”的是什么:续连接?续消息?续状态?如何把这三者拆开设计?
- 缓存放哪里才靠谱:内存、localStorage、IndexedDB 各自适合什么量级?如何做限长与过期避免越用越慢?
- 为什么数据越多越卡:WebSocket 收得很快但页面渲染越来越慢,瓶颈在哪里(网络 / JSON / 响应式渲染)?
- “压缩”和“加密”是一回事吗:gzip、base64、WSS、Token、签名各解决什么问题,分别应该放在哪一层?
章节内容(本讲核心):
- 断线期间消息缓存:Outbox(待发送队列)与 Inbox(已接收序列)拆分
- 重连后补发:ACK 确认、去重(幂等)、窗口限流(避免“补发风暴”)
- 数据压缩:能力协商、压缩/解压的工程化封装(含降级策略)
- 通信安全基础:WSS、Token、Origin 检查、消息大小限制与简单防滥用
- 性能优化思路:批量发送、节流渲染、环形缓冲、丢弃策略与可观测性指标
与前置知识衔接(避免重复):
- 已学:WebSocket 生命周期事件、JSON Envelope、心跳与断线检测、自动重连(退避 + 抖动)、基础联调与异常测试思路
- 本讲配套项目:
12_websocket_resume/(可以理解为把上一讲11_websocket_reconnect/复制一份后做升级),本讲所有代码都组织在该项目中,不要求在上一讲项目上直接改 - 本讲不重复:重连机制从零实现、readyState 细讲、FastAPI WebSocket 从零搭建
- 本讲新增:可靠发送层(缓存 + 续传 + ACK + 去重)、压缩与安全的“最小可用工程化”
项目工坊(本讲交付):
- 在既有自动重连基础上,实现“消息队列 + 断线续传”(客户端 outbox + 服务端 ACK/去重 + 可选 server push 补发)
学生任务(当堂必做):
-
完成消息队列(含持久化、限长、过期)与 ACK 处理
-
完成自动重连 + 断线续传联调:断网/重启/抖动场景下稳定收发
-
AI:生成一套“断线续传协议草案 + 客户端/服务端关键代码”
-
学生:把方案落地到项目并做异常用例验证,对结果负责
作业(布置):
- 实现自动重连 + 断线续传。
- 设计 3 个异常测试用例并验证通过。
- 撰写测试报告。
从上一讲项目到本讲目标(过渡说明)
上一讲《WebSocket 异常处理与断线检测》中,我们已经在 11_websocket_reconnect/ 工程里完成了:
-
前端:
12_websocket_resume/client/src/composables/useReliableReconnectingWebSocket.ts+ReconnectPanel.vue,具备自动重连、心跳检测、online/offline 监听与详细日志,并已内置 Outbox/ACK 的续传能力 -
后端:上一讲的
11_websocket_reconnect/server/app.py是type=system/ping/pong/chat的最小可用服务端;本讲在12_websocket_resume/server/app.py中加入 ACK/去重等可靠性能力 -
测试能力:可以通过 DevTools Offline / 断网 / 重启服务端,观察状态切换与重连过程
-
客户端:在
12_websocket_resume中引入 Outbox + ReliableSender,并封装为useReliableReconnectingWebSocket(业务侧只管“可靠发送”) -
服务端:在
12_websocket_resume/server/app.py上加入 ACK 与去重逻辑
- 先把
12_websocket_resume/跑起来,确认重连与日志都正常; - 按本讲的 Outbox → ReliableSender → ACK 三个小步骤,逐步把“自动重连”升级为“断线可续传”;
本讲统一口径:断线续传的最小协议(可落地、可扩展)
本讲把“续传”拆成两条链路(你必须分开设计,否则一定混乱):
- 客户端 → 服务端:可靠发送(Outbox + ACK + 去重)
- 服务端 → 客户端:可靠推送(可选,Inbox + 补发)
1) 顶层 Envelope(本讲最小版)
{
"schema_version": "1.0.0",
"client_id": "web-01",
"msg_id": "M-1776200000123-000001",
"ts_ms": 1776200000123,
"event": "client.msg",
"seq": 1,
"payload": {
"action": "arm.stop",
"params": {
"reason": "user_click"
}
}
}
逐项解释与自检要点:
schema_version:协议版本;断线续传是长期能力,建议一开始就加版本字段。client_id:客户端身份标识;用于服务端维护“每个客户端的已处理序列号”。msg_id:消息唯一 ID;用于日志定位与排错(不要只靠 seq)。ts_ms:统一毫秒时间戳;用于过期策略与诊断。event:消息类型;本讲用client.msg / server.ack / client.hello / server.push等固定值。seq:客户端单调递增序列号(可靠发送的核心);服务端以它做去重与 ACK。payload:业务体;断线续传不关心业务细节,但必须原样保留。- 补充 1:
client_id必须“稳定”(刷新/重连不变),否则服务端会把你当成新客户端,续传会断档。 - 补充 2:
seq也必须“稳定”(要持久化);如果刷新就从 1 重新开始,会触发服务端去重逻辑,导致新消息被误判为重复。 - 补充 3:
payload建议只放可 JSON 化的数据(纯对象/数组/字符串/数字/布尔),不要直接塞函数/DOM/循环引用对象。
2) ACK(服务端回执,确认“已处理到哪里”)
{
"schema_version": "1.0.0",
"client_id": "web-01",
"ts_ms": 1776200000456,
"event": "server.ack",
"ack_seq": 12,
"ack_msg_id": "M-1776200000123-000012"
}
逐项解释与自检要点:
ack_seq:服务端确认“已处理到的最大 seq”;客户端可以删除seq <= ack_seq的 outbox。ack_msg_id:可选,但强烈建议保留;当你遇到“ack_seq 跳变/重复”时,能快速定位是哪条消息触发的。- 补充 1:服务端可以对“重复消息/跳号消息”也回 ACK(告诉客户端目前进度),客户端据此决定继续补发或等待缺口。
- 补充 2:客户端处理 ACK 时要保证
ack_seq单调递增(回退的 ACK 要忽略),避免误删未确认消息。
3) Hello(重连后,先同步游标,再补发)
{
"schema_version": "1.0.0",
"client_id": "web-01",
"ts_ms": 1776200000789,
"event": "client.hello",
"resume": {
"last_acked_seq": 12,
"want_server_resume": true,
"last_server_seq": 50
},
"cap": {
"compression": ["gzip"]
}
}
逐项解释与自检要点:
last_acked_seq:客户端本地记录的“服务端已确认到哪里”;用于补发范围判断。cap.compression:能力协商;不要强行启用压缩,先明确双方都支持。- 补充 1:hello 的目的不是“发业务”,而是“先把双方状态对齐”(尤其是服务端游标),避免一连上就盲目补发。
- 补充 2:cap 是“声明能力”而非“强制启用”;协商结果要能降级(例如不支持 gzip 时退回 none)。
一、核心理论:把“可靠性”变成可计算的状态
本讲只做“最小可靠”但必须具备 4 个点:
- 客户端有 Outbox:断线期间消息进入队列(而不是直接丢掉)
- 每条消息有 seq:可排序、可续传、可判断缺口
- 服务端有 ACK:明确告诉客户端“我处理到哪里了”
- 服务端能去重:同一 seq/同一 msg_id 重复到达不会重复执行业务
推荐的消息流(客户端 → 服务端):
- 客户端生成消息(seq+1)→ 写入 outbox(持久化)→ 尝试发送。
- 服务端收到消息:
- 如果
seq <= last_processed_seq:判定为重复,直接回 ACK(幂等)。 - 如果
seq == last_processed_seq + 1:执行业务,再更新游标并回 ACK。
- 客户端收到
ack_seq:删除seq <= ack_seq的 outbox 项,并更新last_acked_seq。
二、实操:客户端 Outbox(队列 + 持久化 + 限长 + 过期)
12_websocket_resume/client/src/composables/outbox.ts
1) Outbox 类型与持久化工具
// ClientMsg:客户端发送给服务端的“可靠消息”顶层结构
// 约束点:
// - 字段名固定(跨端统一口径)
// - event 固定为 client.msg(服务端用它分支处理)
// - seq 单调递增(断线续传的核心)
export type ClientMsg<TPayload extends Record<string, unknown>> = {
schema_version: '1.0.0' // 协议版本:未来升级时用于兼容判断
client_id: string // 客户端身份:服务端按它维护 last_processed_seq
msg_id: string // 消息唯一 ID:用于排查问题(不要只靠 seq)
ts_ms: number // 客户端生成时间:用于 TTL/诊断
event: 'client.msg' // 消息类型:本讲可靠消息固定为 client.msg
seq: number // 客户端序列号:从 1 开始递增
payload: TPayload // 业务体:可靠性层不关心内容,但必须原样传递
}
// OutboxItem:Outbox 内部持久化的条目
// 设计点:
// - json 直接存序列化后的文本,避免 localStorage 持久化时出现循环引用
export type OutboxItem = {
seq: number // 用于排序/补发
msg_id: string // 用于日志定位
ts_ms: number // 用于过期清理
json: string // 完整的 ClientMsg 序列化结果
}
// OutboxSnapshot:Outbox 的整体快照(一次性写入 localStorage)
// 关键点:
// - last_acked_seq 是“服务端确认游标”,必须单调递增
// - next_seq 是“下一条要分配的序号”,不要轻易回退,否则会导致服务端误判重复
export type OutboxSnapshot = {
client_id: string
last_acked_seq: number
next_seq: number
items: OutboxItem[]
}
// 统一毫秒时间戳
export function nowMs(): number {
return Date.now()
}
// 消息 ID 生成策略(课堂版):时间戳 + 6 位序号
export function makeMsgId(tsMs: number, seq: number): string {
const seq6 = String(seq).padStart(6, '0')
return `M-${tsMs}-${seq6}`
}
// 容错 JSON 解析:localStorage 里有脏数据也不应把页面搞崩
export function safeParseJson<T>(text: string): T | null {
try {
return JSON.parse(text) as T
} catch {
return null
}
}
逐段解释与自检要点:
ClientMsg:把可靠发送需要的字段固定住(schema_version/client_id/msg_id/ts_ms/event/seq)。OutboxItem.json:存储序列化后的 JSON 字符串,避免持久化时出现循环引用或丢字段。OutboxSnapshot:把 outbox 的“游标(last_acked_seq/next_seq)”与“队列 items”统一存储,重启后可恢复。safeParseJson:持久化读取要可失败且不崩溃(否则 localStorage 一脏就全站白屏)。
2) Outbox 实现:限长 + 过期 + 断电恢复
export type OutboxOptions = {
storage_key: string // localStorage key:按 clientId 隔离
max_items: number // 队列最多保留多少条(防止无限增长)
ttl_ms: number // 单条消息的生存时间(毫秒)
max_item_bytes: number // 单条消息最大字节数(UTF-8),防止超大对象写爆存储
}
export class Outbox {
private snapshot: OutboxSnapshot
private opt: OutboxOptions
constructor(clientId: string, opt?: Partial<OutboxOptions>) {
this.opt = {
storage_key: `ws_outbox_${clientId}`,
max_items: 300,
ttl_ms: 30 * 60 * 1000,
max_item_bytes: 256 * 1024,
...opt
}
// 启动时尝试从 localStorage 恢复快照;失败则使用新快照
const loaded = this.load()
this.snapshot =
loaded ??
({
client_id: clientId,
last_acked_seq: 0,
next_seq: 1,
items: []
} satisfies OutboxSnapshot)
this.gc()
this.persist()
}
get clientId(): string {
return this.snapshot.client_id
}
get lastAckedSeq(): number {
return this.snapshot.last_acked_seq
}
// ACK 游标只允许前进(回退会导致客户端重复补发旧消息)
setLastAckedSeq(ackSeq: number) {
if (!Number.isFinite(ackSeq) || ackSeq < 0) return
if (ackSeq <= this.snapshot.last_acked_seq) return
this.snapshot.last_acked_seq = ackSeq
// 删除所有“已被服务端确认处理”的消息
this.snapshot.items = this.snapshot.items.filter((x) => x.seq > ackSeq)
this.persist()
}
enqueue<TPayload extends Record<string, unknown>>(payload: TPayload): OutboxItem {
const ts = nowMs()
const seq = this.snapshot.next_seq
const msg: ClientMsg<TPayload> = {
schema_version: '1.0.0',
client_id: this.snapshot.client_id,
msg_id: makeMsgId(ts, seq),
ts_ms: ts,
event: 'client.msg',
seq,
payload
}
// 先序列化再入队:保证“断电/刷新”后仍可恢复补发
const json = JSON.stringify(msg)
const bytes = new TextEncoder().encode(json).length
if (bytes > this.opt.max_item_bytes) {
// 超大消息直接拒绝入队:避免 localStorage 写爆导致整站异常
throw new Error(`outbox item too large: ${bytes} > ${this.opt.max_item_bytes}`)
}
const item: OutboxItem = { seq, msg_id: msg.msg_id, ts_ms: ts, json }
this.snapshot.items.push(item)
this.snapshot.next_seq = seq + 1
// 入队后做清理(过期/限长/已确认),并持久化
this.gc()
this.persist()
return item
}
// 返回“待补发”的消息列表(按 seq 升序)
listPending(): OutboxItem[] {
const ack = this.snapshot.last_acked_seq
return this.snapshot.items.filter((x) => x.seq > ack).sort((a, b) => a.seq - b.seq)
}
private load(): OutboxSnapshot | null {
const raw = localStorage.getItem(this.opt.storage_key)
if (!raw) return null
const parsed = safeParseJson<OutboxSnapshot>(raw)
if (!parsed) return null
// 课堂最小实现只做最轻量的校验;更严谨可以逐字段校验结构
return parsed
}
private gc() {
const now = nowMs()
// 1) 过期清理:超过 ttl_ms 的条目直接丢弃
this.snapshot.items = this.snapshot.items.filter((x) => now - x.ts_ms <= this.opt.ttl_ms)
// 2) 限长清理:只保留最近 max_items 条(丢弃最旧)
if (this.snapshot.items.length > this.opt.max_items) {
const keep = this.snapshot.items.slice(this.snapshot.items.length - this.opt.max_items)
this.snapshot.items = keep
}
// 3) 再次按 ACK 游标删除已确认消息(双保险)
const ack = this.snapshot.last_acked_seq
this.snapshot.items = this.snapshot.items.filter((x) => x.seq > ack)
}
private persist() {
localStorage.setItem(this.opt.storage_key, JSON.stringify(this.snapshot))
}
}
逐段解释与自检要点:
-
max_items/ttl_ms:必须有,否则断线期间积压会无限增长,最终卡死或写爆存储。 -
enqueue:先写入 outbox 再发送;这样即使“写完就断电”,重启也能补发。 -
setLastAckedSeq:只允许 ack 前进;同时删除seq <= ackSeq的历史项,避免重复补发。 -
listPending:确保按 seq 升序补发,避免乱序导致的业务副作用。 -
断网期间点击按钮 20 次 → localStorage 里能看到 outbox 增长但不会无限增长。
-
刷新页面(F5)→ outbox 不丢,重连后仍能补发。
三、实操:可靠发送层(连接打开就补发,收到 ACK 就删除)
下面代码不实现重连本身(复用上一讲的自动重连/心跳),只实现“可靠发送层”核心逻辑:Outbox + flush + ACK。
推荐在客户端工程中新建一个组合式工具文件:
12_websocket_resume/client/src/composables/reliableSender.ts
export type ServerAck = {
schema_version: '1.0.0'
client_id: string // ACK 属于哪个客户端(防止多个 clientId 混淆)
ts_ms: number // 服务端生成时间戳
event: 'server.ack' // 事件名固定
ack_seq: number // 服务端已处理到的最大 seq(可靠性核心字段)
ack_msg_id?: string // 可选:用于日志定位
}
export type WsLike = {
send: (text: string) => void // WebSocket.send
readyState: number // WebSocket.readyState(1 表示 OPEN)
}
// ReliableSender:把 Outbox + ACK 组合成“可靠发送”能力
// 核心点:
// - outbox 负责存储“未确认消息”
// - inFlight + windowSize 负责“补发限流”,避免断线积压后一次性打爆服务端
export class ReliableSender {
private outbox: Outbox
private inFlight = new Set<number>()
private windowSize: number
constructor(outbox: Outbox, windowSize = 8) {
this.outbox = outbox
this.windowSize = Math.max(1, windowSize)
}
sendBusiness<TPayload extends Record<string, unknown>>(ws: WsLike | null, payload: TPayload): OutboxItem {
// 永远“先入队再发送”:断线时也不会丢
const item = this.outbox.enqueue(payload)
this.tryFlush(ws)
return item
}
onAck(ack: ServerAck) {
// 只处理当前 client_id 的 ACK
if (ack.client_id !== this.outbox.clientId) return
// 推进 outbox 游标,并删除已确认消息
this.outbox.setLastAckedSeq(ack.ack_seq)
// 清理所有 seq <= ack_seq 的 inFlight(它们已经被服务端确认处理)
for (const seq of Array.from(this.inFlight)) {
if (seq <= ack.ack_seq) this.inFlight.delete(seq)
}
}
tryFlush(ws: WsLike | null) {
// 仅在连接 OPEN 时允许发送
if (!ws || ws.readyState !== 1) return
const pending = this.outbox.listPending()
for (const item of pending) {
// 避免重复发送同一条 seq(例如 ACK 还没到时多次触发 tryFlush)
if (this.inFlight.has(item.seq)) continue
// 窗口限流:达到窗口大小就暂停,等待 ACK 推进后再继续
if (this.inFlight.size >= this.windowSize) break
ws.send(item.json)
this.inFlight.add(item.seq)
}
}
}
逐段解释与自检要点:
-
windowSize:补发必须限流;否则断线积压几十/几百条,一次性全发会把 UI 与服务端打爆。 -
sendBusiness:业务侧永远调用它;即使断线,消息也会进入 outbox(不会丢)。 -
tryFlush:只在readyState === 1(OPEN)时发送;同时避免重复发送(inFlight)。 -
onAck:ACK 到达后推进游标并清理 inFlight;inFlight 用于“避免短时间内重复刷出同一条”。 -
在
12_websocket_resume/client/src/composables/useReliableReconnectingWebSocket.ts内:在ws.onopen中先发client.hello,再sender.tryFlush(ws);在ws.onmessage里解析event === 'server.ack'调用sender.onAck(ack)并继续补发; -
在
ws.onclose中:不清空 outbox;断线续传只依赖 ACK 推进来删除; -
在
12_websocket_resume/client/src/components/ReconnectPanel.vue中:业务发送统一调用sendReliable(payload)(底层自动入队/补发)。
四、服务端增量:ACK + 去重(保证幂等)
下面是 FastAPI WebSocket 的最小逻辑:记住每个 client_id 已处理到的最大 seq;收到重复 seq 就只回 ACK,不重复执行业务。
12_websocket_resume/server/app.py
import json
import time
from dataclasses import dataclass, field
from typing import Any
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
app = FastAPI()
def now_ms() -> int:
# 统一毫秒时间戳(与前端协议字段 ts_ms 对齐)
return int(time.time() * 1000)
def make_ack(client_id: str, ack_seq: int, ack_msg_id: str | None) -> dict[str, Any]:
# server.ack:告诉客户端“我处理到哪个 seq 了”
# - ack_seq 是可靠性核心字段(客户端据此删除 outbox)
data: dict[str, Any] = {
"schema_version": "1.0.0",
"client_id": client_id,
"ts_ms": now_ms(),
"event": "server.ack",
"ack_seq": ack_seq,
}
if ack_msg_id is not None:
data["ack_msg_id"] = ack_msg_id
return data
def parse_json(text: str) -> dict[str, Any] | None:
# 容错解析:收到非 JSON 直接忽略
try:
data = json.loads(text)
except Exception:
return None
return data if isinstance(data, dict) else None
@dataclass
class ClientState:
# last_processed_seq:服务端确认“已按序处理完成”的最大 seq
# last_seen_at_ms:便于做清理/超时策略(课堂不展开)
last_processed_seq: int = 0
last_seen_at_ms: int = field(default_factory=now_ms)
CLIENTS: dict[str, ClientState] = {}
@app.websocket("/ws")
async def ws_endpoint(ws: WebSocket) -> None:
await ws.accept()
# 当前连接绑定的 client_id(收到 hello 或第一条 client.msg 后确定)
client_id: str | None = None
try:
while True:
text = await ws.receive_text()
data = parse_json(text)
if data is None:
continue
event = data.get("event")
if event == "client.hello":
# 重连后的第一件事:对齐游标
client_id = data.get("client_id")
if not isinstance(client_id, str) or client_id == "":
await ws.close(code=4401)
return
CLIENTS.setdefault(client_id, ClientState())
state = CLIENTS[client_id]
await ws.send_text(json.dumps(make_ack(client_id, state.last_processed_seq, None), ensure_ascii=False))
continue
if event != "client.msg":
continue
if client_id is None:
client_id = data.get("client_id")
if not isinstance(client_id, str) or client_id == "":
await ws.close(code=4401)
return
seq = data.get("seq")
msg_id = data.get("msg_id")
if not isinstance(seq, int) or seq < 1:
await ws.close(code=4400)
return
state = CLIENTS.setdefault(client_id, ClientState())
state.last_seen_at_ms = now_ms()
if seq <= state.last_processed_seq:
# 重复消息:只回 ACK,不重复执行业务逻辑(幂等)
await ws.send_text(json.dumps(make_ack(client_id, state.last_processed_seq, str(msg_id) if msg_id else None), ensure_ascii=False))
continue
if seq != state.last_processed_seq + 1:
# 跳号:说明中间缺消息;课堂最小实现为“回当前 ACK”,客户端会按序补发
await ws.send_text(json.dumps(make_ack(client_id, state.last_processed_seq, None), ensure_ascii=False))
continue
# 正常按序:处理本条消息,并推进服务端游标
state.last_processed_seq = seq
await ws.send_text(json.dumps(make_ack(client_id, state.last_processed_seq, str(msg_id) if msg_id else None), ensure_ascii=False))
except WebSocketDisconnect:
return
逐段解释与自检要点:
ClientState.last_processed_seq:服务端游标;幂等与续传的根。client.hello:重连后第一件事是对齐游标;服务端回一次 ACK,把“我处理到哪里”告诉客户端。seq <= last_processed_seq:重复消息直接回 ACK,不执行业务(避免重复执行)。
启动命令(复用既有工程习惯)
# 启动 FastAPI WebSocket 服务(开发模式)
uvicorn app:app --reload --port 8000
逐段解释与自检要点:
app:app:第一个 app 是文件名(app.py),第二个 app 是 FastAPI 实例名。--port 8000:与前端连接 URL 保持一致(例如ws://localhost:8000/ws)。
练习(至少 2 题)
- 为 Outbox 增加“手动清空”能力,但必须保留
last_acked_seq/next_seq的合理性。
提示:清空 items 不等于把 next_seq 归零;归零会导致重复 seq,引发服务端幂等误判。 - 把服务端的“缺口处理”从“拒绝处理”升级为“暂存乱序消息并等待缺口补齐”。
提示:设置上限(最多缓存 50 条乱序),超过上限直接丢弃并回当前 ACK,避免内存被打爆。
一、数据压缩(应用层):可协商、可降级、可观测
1) 前端:gzip 压缩/解压(优先用原生 CompressionStream,缺失则降级)
代码在配套工程中的推荐文件路径:
12_websocket_resume/client/src/utils/compression.ts
// CompressionAlg:应用层压缩算法枚举
export type CompressionAlg = 'none' | 'gzip'
// EncodedPayload:把“压缩后的二进制”包装为可 JSON 传输的结构
export type EncodedPayload = {
alg: CompressionAlg
b64: string
}
// bytes -> base64(浏览器环境)
function bytesToBase64(bytes: Uint8Array): string {
let binary = ''
const len = bytes.length
for (let i = 0; i < len; i++) binary += String.fromCharCode(bytes[i])
return btoa(binary)
}
// base64 -> bytes(浏览器环境)
function base64ToBytes(b64: string): Uint8Array {
const binary = atob(b64)
const bytes = new Uint8Array(binary.length)
for (let i = 0; i < binary.length; i++) bytes[i] = binary.charCodeAt(i)
return bytes
}
async function gzipCompress(text: string): Promise<Uint8Array> {
if (typeof CompressionStream === 'undefined') {
throw new Error('CompressionStream not supported')
}
const cs = new CompressionStream('gzip')
const input = new TextEncoder().encode(text)
// 注意:部分 TS lib 定义下,BlobPart 对 Uint8Array 的类型较挑剔
// 课堂工程用“拷贝到 ArrayBuffer”避免类型报错
const inputAb = new ArrayBuffer(input.byteLength)
new Uint8Array(inputAb).set(input)
const stream = new Blob([inputAb]).stream().pipeThrough(cs)
const outAb = await new Response(stream).arrayBuffer()
return new Uint8Array(outAb)
}
async function gzipDecompress(bytes: Uint8Array): Promise<string> {
if (typeof DecompressionStream === 'undefined') {
throw new Error('DecompressionStream not supported')
}
const ds = new DecompressionStream('gzip')
const inputAb = new ArrayBuffer(bytes.byteLength)
new Uint8Array(inputAb).set(bytes)
const stream = new Blob([inputAb]).stream().pipeThrough(ds)
const outAb = await new Response(stream).arrayBuffer()
return new TextDecoder().decode(outAb)
}
export async function encodeJsonWithCompression(obj: unknown, prefer: CompressionAlg): Promise<EncodedPayload> {
const text = JSON.stringify(obj)
// prefer 由 hello.cap 协商得到;如果不是 gzip 就直接返回 none
if (prefer !== 'gzip') return { alg: 'none', b64: bytesToBase64(new TextEncoder().encode(text)) }
try {
const gz = await gzipCompress(text)
return { alg: 'gzip', b64: bytesToBase64(gz) }
} catch {
// 不支持 gzip 或压缩失败时自动降级,保证功能可用
return { alg: 'none', b64: bytesToBase64(new TextEncoder().encode(text)) }
}
}
export async function decodeJsonWithCompression(payload: EncodedPayload): Promise<unknown> {
const bytes = base64ToBytes(payload.b64)
const text = payload.alg === 'gzip' ? await gzipDecompress(bytes) : new TextDecoder().decode(bytes)
return JSON.parse(text) as unknown
}
逐段解释与自检要点:
-
CompressionStream/DecompressionStream:浏览器原生 gzip;如果不支持就降级为不压缩。 -
btoa/atob:用于把二进制转为 base64 字符串;注意这会有额外膨胀,但整体仍常常比原始 JSON 小(尤其 payload 大且可压缩时)。 -
encodeJsonWithCompression:统一入口,业务侧只选prefer(由 hello.cap 协商得到)。 -
decodeJsonWithCompression:统一解码入口;收到的消息先 decode 再 JSON.parse。 -
对一个 50KB 的 JSON(包含重复字段/数组)进行 gzip 后,base64 字符串长度明显下降。
-
在不支持 CompressionStream 的环境中,仍能正常发送(alg 自动降级为 none)。
2) 后端:gzip/base64 解码(标准库实现)
代码在配套工程中的位置:
12_websocket_resume/server/app.py(函数decode_payload)
import base64
import gzip
import json
from typing import Any
def decode_payload(alg: str, b64: str) -> Any:
# 1) base64 文本 -> 原始 bytes
raw = base64.b64decode(b64.encode("utf-8"))
# 2) gzip 解压(若 alg != gzip 则视为未压缩)
if alg == "gzip":
raw = gzip.decompress(raw)
# 3) bytes -> utf-8 文本 -> JSON 对象
text = raw.decode("utf-8")
return json.loads(text)
逐段解释与自检要点:
base64.b64decode:把字符串还原为二进制。gzip.decompress:解压 gzip;若 alg 不是 gzip 则直接按 utf-8 解码。json.loads:解码后的结果是原始 JSON 对象,后续再走你已有的 Envelope 校验逻辑。
本讲只做“最常见风险的最低成本防线”,不把安全讲成“大工程”:
- 传输安全:生产环境必须用 WSS(TLS)
- 身份认证:浏览器 WebSocket 不能自定义 Header,常用做法是 URL query token 或 subprotocol
- 来源校验:检查 Origin(同源/白名单),避免被第三方网页滥用连接
- 限长与限频:限制单条消息大小、每秒消息数,避免被误用或打爆
1) 服务端:token 与 origin 检查(最小示例)
代码在配套工程中的位置:
12_websocket_resume/server/app.py(函数extract_token/is_allowed_origin)
from urllib.parse import parse_qs
def extract_token(query_string: str) -> str | None:
# query_string 形如 "token=devtoken&x=1"
qs = parse_qs(query_string)
token = qs.get("token", [None])[0]
# 返回非空字符串,否则返回 None
return token if isinstance(token, str) and token != "" else None
def is_allowed_origin(origin: str | None) -> bool:
# 浏览器 WebSocket 会自动带 Origin;非浏览器客户端可能没有
if origin is None:
return False
allowed = {"http://localhost:5173", "http://127.0.0.1:5173"}
return origin in allowed
逐段解释与自检要点:
推荐使用方式(伪代码表达意图):
- 连接建立时:
- 校验 Origin 不通过 → 直接 close
- 校验 token 不通过 → 直接 close
2) 限长:防止大消息直接把服务端内存顶爆
- 单条文本帧大小上限(例如 256KB):超过就 close 并提示
- Outbox 入队前先做大小估算:超过上限直接拒绝并给 UI 提示(不要把超大对象塞进队列)
前端大小估算(最小实现):
代码在配套工程中的推荐文件路径:
12_websocket_resume/client/src/utils/size.ts
// 输入一个字符串,估算它按 UTF-8 编码后的字节数(用于“限长”与“压缩阈值”判断)
export function estimateUtf8Bytes(text: string): number {
return new TextEncoder().encode(text).length
}
逐段解释与自检要点:
TextEncoder().encode:得到 UTF-8 字节数组,其 length 就是“近似真实网络字节数”。- 入队/发送前先估算:避免把超大 JSON 写进 localStorage 导致整个站点存储异常。
三、性能优化思路:先稳,再快(网络/解析/渲染三段)
把性能问题拆成三段定位:
- 网络段:消息太大/太频繁 → 压缩、合并、采样、丢弃策略
- 解析段:JSON.parse 太重 → 降频、分片、二进制(扩展方向)
- 渲染段:响应式更新太频繁 → 节流渲染、环形缓冲、只展示最新 N 条
1) 环形缓冲(只保留最近 N 条日志/消息)
代码在配套工程中的推荐文件路径:
12_websocket_resume/client/src/utils/ringBuffer.ts
// RingBuffer:只保留最近 N 条数据的“限长数组”
// 用途:日志窗口、最近消息列表(防止页面越跑越慢)
export class RingBuffer<T> {
private buf: T[] = []
private cap: number
constructor(capacity: number) {
// 至少 1 个容量
this.cap = Math.max(1, capacity)
}
push(x: T) {
this.buf.push(x)
if (this.buf.length > this.cap) {
// 超过容量时,丢弃最旧的若干条
this.buf.splice(0, this.buf.length - this.cap)
}
}
toArray(): T[] {
// 返回拷贝,避免外部修改内部数组
return this.buf.slice()
}
}
逐段解释与自检要点:
- 环形缓冲本质是“限长 list”;配合 UI 展示可以避免页面越跑越慢。
splice删除头部多余项;容量 N 建议按场景调(例如日志 200、消息 1000)。
2) UI 渲染节流:把 100 次更新变成 10 次(思路示例)
代码在配套工程中的推荐文件路径:
12_websocket_resume/client/src/utils/rafBatcher.ts
// createRafBatcher:把高频 push 合并到“下一帧”批量处理
// 目标:降低响应式更新次数,避免 100 条消息触发 100 次渲染
export function createRafBatcher<T>(apply: (batch: T[]) => void) {
let pending: T[] = []
let scheduled = false
return (x: T) => {
// 先缓存到 pending
pending.push(x)
if (scheduled) return
scheduled = true
requestAnimationFrame(() => {
scheduled = false
// 一帧内的所有数据合并成一个 batch
const batch = pending
pending = []
apply(batch)
})
}
}
逐段解释与自检要点:
- 高频消息不应每条都触发一次组件渲染;先缓存在
pending,每帧最多更新一次 UI。 apply(batch):把批次一次性写入响应式状态(例如 push 多条到 ring buffer),显著降低渲染压力。
练习(至少 2 题)
- 在 ReliableSender 上增加“失败重试次数限制”:同一 seq 如果连续发送失败超过 N 次,标记为 dead-letter 并停止补发。
提示:dead-letter 也要持久化,否则刷新后又会继续补发。 - 设计一套“压缩启用阈值”:小于 1KB 不压缩,大于 1KB 才压缩,并记录压缩前后字节数用于观察效果。
提示:记录字段可以写进日志,不必改协议。
- 断网期间发送 10 条业务消息:页面提示“已入队”,恢复后能按顺序补发成功
- 服务端重启:客户端自动重连后继续补发,且不会重复执行业务(服务端日志可证明)
- Outbox 有限长与过期:积压过大时会丢弃最旧消息并给出明确提示
建议的联调步骤(最小可复现)
- 打开页面并连接成功(保持上一讲的心跳与重连能力)。
- 断网(系统断网/浏览器离线)后连续触发业务消息 10 次。
- 恢复网络:观察 hello → ack → flush 补发 → ack 推进 的完整链路。
- 在补发过程中重启服务端:观察客户端重连后继续补发,最终 ack_seq 到达目标值。
大模型任务:AI 生成断线续传方案与代码(可直接复制使用)
把下面提示词复制给 AI,并要求它输出“协议 + 代码 + 自检点”,且必须与你的工程栈一致(Vue3 + TS + FastAPI):
你是资深全栈工程师。请为 WebSocket 实现“断线续传 + 消息缓存 + 去重幂等”方案,要求:
1) 客户端:Vue3 + TypeScript,提供 Outbox(持久化/限长/过期)、窗口限流补发、ACK 处理、重连后 hello 同步游标;
2) 服务端:FastAPI WebSocket,按 client_id 维护 last_processed_seq,支持重复消息去重并回 ACK;
3) 协议:schema_version、client_id、msg_id、ts_ms、event、seq、payload;包含 client.hello 与 server.ack;
4) 可靠性:断网/服务端重启/重复发送/乱序场景都要说明处理策略;
5) 输出:给出关键代码(TS 与 Python)与每段代码的自检点;
6) 禁止:不要引入额外第三方库(除非给出无库版本),不要省略关键字段。
校验点(你必须人工检查):
- AI 生成的 seq/ack_seq 是否单调递增且不会回退
- outbox 是否先入队再发送(避免断电丢失)
- 服务端是否对重复 seq 做幂等处理(只回 ACK,不重复执行业务)
- 是否存在无限增长的数据结构(队列、日志、缓存必须限长)
课后作业(布置)
- 实现自动重连 + 断线续传。
- 设计 3 个异常测试用例并验证通过。
- 撰写测试报告。
建议的 3 个异常用例(可直接用,也可自行替换):
- 用例 A:断网 60 秒期间发送 20 条消息,恢复后 30 秒内补发完成,ack_seq 最终一致
- 用例 B:补发过程中服务端重启 1 次,最终补发完成且服务端“业务执行次数 = 消息条数”(无重复)
- 用例 C:客户端刷新页面(F5)后继续补发未确认消息,最终补发完成(验证 outbox 持久化)
测试报告最小结构(不要求长,但必须可复现):
- 环境信息:前端/后端启动方式、关键版本、测试时间
- 用例描述:步骤、预期结果、实际结果
- 证据:截图/日志(至少包含 ack_seq 推进与 outbox 变化)
- 结论:是否通过;未通过说明原因与改进点
自检清单(提交前必做)
- Markdown:标题层级连续、代码块闭合、表格对齐、JSON/TS/Python 语法无明显错误
- 协议:event/seq/ack_seq 字段命名一致,client_id 口径一致
- 可靠性:outbox 限长 + 过期存在且生效;ack 推进不会回退
- 稳定性:断网/重启/刷新三种场景都能恢复,且不重复执行